package dslab.transfer;

import java.io.InputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.Arrays;
import java.util.concurrent.*;

import at.ac.tuwien.dsg.orvell.Shell;
import at.ac.tuwien.dsg.orvell.StopShellException;
import at.ac.tuwien.dsg.orvell.annotation.Command;
import dslab.ComponentFactory;
import dslab.core.Server;
import dslab.enums.Protocol;
import dslab.enums.Role;
import dslab.nameserver.INameserverRemote;
import dslab.util.MisconfigurationException;
import dslab.util.*;

/**
 * feature complete TransferServer that implements a shell and DMTP (client+server functionality)
 */
public class TransferServer implements ITransferServer {

    // configuration keys that have to be present
    private static final String[] requiredConfigKeys = new String[] {
        "tcp.port", "monitoring.host", "monitoring.port"
    };

    private final String componentId;
    private final Config config;
    private final Config domains;
    private final InputStream inputStream;
    private final PrintStream printStream;
    private final BlockingQueue<Runnable> outboundSendTasks;
    private final Pair<InetAddress, Integer> monitoring;
    private final INameserverRemote root;

    private ExecutorService pool;
    private Server dmtpServer;

    /**
     * creates a new TransferServer
     * @param componentId the id of the component that corresponds to the Config resource
     * @param config the component config
     * @param domains the domain config
     * @param in the input stream to read console input from
     * @param out the output stream to write console output to
     */
    public TransferServer(String componentId, Config config, Config domains, InputStream in,
                          PrintStream out) throws MisconfigurationException, IllegalArgumentException, UnknownHostException, RemoteException, NotBoundException {
        // print configuration for the benefit of the user
        System.out.printf("@%s: configuration:%n", componentId);
        for (String key : config.listKeys())
            System.out.printf("@%s: { %s: %s }%n", componentId, key, config.getString(key));
        System.out.println("domains:");
        for (String domain : domains.listKeys())
            System.out.printf("@%s: { %s: %s }%n", componentId, domain, domains.getString(domain));

        this.componentId = componentId;
        this.config = config;
        this.domains = domains;
        this.inputStream = in;
        this.printStream = out;
        this.outboundSendTasks = new LinkedBlockingQueue<>();

        // check if required keys are present
        for (String key : requiredConfigKeys)
            if (!config.containsKey(key))
                throw new MisconfigurationException();

        // set up monitoring host and port
        this.monitoring = new Pair<>(
            InetAddress.getByName(config.getString("monitoring.host")),
            config.getInt("monitoring.port")
        );

        // look up registry for domain resolution
        this.root = (INameserverRemote) LocateRegistry.getRegistry(
            config.getString("registry.host"),
            config.getInt("registry.port")
        ).lookup(config.getString("root_id"));
    }

    @Override
    public void run() {
        // initialize thread pool to fixed amount related to available cores
        // reasonable estimate for good thread pool thread count
        // minimum of 4 threads required
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        pool = Executors.newFixedThreadPool(availableProcessors < 4 ? 4 : availableProcessors + 1);

        // initialize queue sender thread and add to pool
        Thread sendQueueHandlerThread = new Thread(new SendQueueHandler(
            String.format("%s-send-queue-handler", componentId),
            outboundSendTasks
        ));
        pool.execute(sendQueueHandlerThread);

        // initialize DMTP server thread
        dmtpServer = new Server(
            String.format("%s-server", componentId),
            config.getInt("tcp.port"),
            Role.Transfer,
            Protocol.DMTP,
            domains,
            pool,
            outboundSendTasks,
            monitoring,
            root
        );
        pool.execute(dmtpServer);

        // finally instantiate shell
        Shell shell = new Shell(inputStream, printStream);
        shell.register(this);
        shell.setPrompt(String.format("%s> ", componentId));
        shell.run();
    }

    @Override
    @Command
    public void shutdown() throws StopShellException {
        // inspired by the oracle docs of ExecutorService
        if (pool != null) {
            // attempt closing the dmtp server thread
            if (dmtpServer != null)
                dmtpServer.shutdown();

            // prevent new tasks from being submitted
            pool.shutdown();

            // try to await termination of currently active threads and optionally force termination
            try {
                if (!pool.awaitTermination(3, TimeUnit.SECONDS))
                    pool.shutdownNow();
            } catch (InterruptedException exception) {
                // force termination
                pool.shutdownNow();
            }
        }

        System.out.printf("@%s: stopped%n", componentId);

        // close shell
        throw new StopShellException();
    }

    public static void main(String[] args) throws Exception {
        try {
            ITransferServer server = ComponentFactory.createTransferServer(args[0], System.in, System.out);
            server.run();
        } catch (MisconfigurationException exception) {
            System.err.printf("@%s: a fatal error has occurred: configuration is lacking one of the following configuration options: %s%n", args[0], Arrays.toString(requiredConfigKeys));
        } catch (StopShellException ignored) {}
    }
}
